package com.hulk.common.support.mq.feature;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;

import lombok.extern.slf4j.Slf4j;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import com.google.common.base.Splitter;
import com.google.common.base.Strings;

/**
 * @author cmt
 * 
 */
@Slf4j
//@Component("sendMessageUtil")
public class SendMessageUtil implements IClientSendMessage {

	private JmsTemplate template;
	private JmsTemplate jmsTemplate4async;
	private Destination destination;
	private Destination respDest;
	private Map<String, Destination> destinationMap;
	private Map<String, Destination> respDestMap;
	/** 以报文中通讯流水为主键的一个线程安全hashmap */
	private static ConcurrentHashMap<String, RecvMessageBean> excuterthreadsmap = new ConcurrentHashMap<String, RecvMessageBean>();

	/**
	 * 无返回的消息--字符串
	 * 
	 * @see com.huateng.p3.common.tools.activemq.IClientSendMessage#aSyncSendMsg(java.lang.String,
	 *      java.lang.String, java.lang.String, java.lang.String[])
	 */
	@Override
	public void aSyncSendMsg(final String seq, final String reqQueueName,
			final String context, final String... expand) {
		aSyncSendMsg(seq, reqQueueName, context, null, expand);
	}
	@Override
	public void aSyncSendMsg(final String seq, final String reqQueueName,
			final String context, final String senderid, final String... expand) {
		destination = destinationMap.get(reqQueueName);
		if (destination == null) {
			destination = destinationMap.get(AppCode.DEFAULTQUEUE);
		}
		jmsTemplate4async.send(destination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				Message msg = session.createTextMessage(context);
				if(!Strings.isNullOrEmpty(senderid)){
					msg.setStringProperty(AppCode.SENDERID, senderid);
				}
				if (expand != null && expand.length > 0) {
					for (int i = 0; i < expand.length; i++) {
						List<String> exp = Splitter.on('|').trimResults()
								.omitEmptyStrings().splitToList(expand[i]);
						msg.setStringProperty(exp.get(0), exp.get(1));
					}
				}
				msg.setStringProperty(AppCode.REQTXNSEQ, seq);
				msg.setBooleanProperty(AppCode.ISASYNC, true);
				return msg;
			}
		});
	}

	/**
	 * 无返回的消息--对象
	 * 
	 * @see com.huateng.p3.common.tools.activemq.IClientSendMessage#aSyncSendMsg(java.lang.String,
	 *      java.lang.String, java.lang.String, java.lang.String[])
	 */
	@Override
	public void aSyncSendMsg(final String seq, final String reqQueueName,
			final Serializable obj, final String... expand) {
		aSyncSendMsg(seq, reqQueueName, obj, null,  expand);
	}
	@Override
	public void aSyncSendMsg(final String seq, final String reqQueueName,
			final Serializable obj,final String senderid, final String... expand) {
		destination = destinationMap.get(reqQueueName);
		if (destination == null) {
			destination = destinationMap.get(AppCode.DEFAULTQUEUE);
		}
		jmsTemplate4async.send(destination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				ObjectMessage msg = session.createObjectMessage(obj);
				if(!Strings.isNullOrEmpty(senderid)){
					msg.setStringProperty(AppCode.SENDERID, senderid);
				}
				if (expand != null && expand.length > 0) {
					for (int i = 0; i < expand.length; i++) {
						List<String> exp = Splitter.on('|').trimResults()
								.omitEmptyStrings().splitToList(expand[i]);
						msg.setStringProperty(exp.get(0), exp.get(1));
					}
				}
				msg.setStringProperty(AppCode.REQTXNSEQ, seq);
				msg.setBooleanProperty(AppCode.ISASYNC, true);
				return msg;
			}
		});
	}
	
	/**
	 * 发送信息并获取返回信息 (non-Javadoc)
	 * 
	 * @see com.huateng.p3.common.tools.activemq.IClientSendMessage#sendMsg(java.lang.String,
	 *      java.lang.String, java.lang.String, java.lang.String,
	 *      java.lang.String[])
	 */
	@Override
	public Message sendMsg(final String reqTxnSeq, final String reqQueueName,
			final String rcvQueueName, final String context,
			final String... expand) {
		return sendMsg(reqTxnSeq, reqQueueName, rcvQueueName, context, null,null,
				expand);
	}
	@Override
	public Message sendMsg(final String reqTxnSeq, final String reqQueueName,
			final String rcvQueueName, final String context , final String senderid, final String destsenderid,
			final String... expand) {
		destination = destinationMap.get(reqQueueName);
		respDest = respDestMap.get(rcvQueueName);
		if (destination == null || respDest == null) {
			destination = destinationMap.get(AppCode.DEFAULTQUEUE);
			respDest = respDestMap.get(AppCode.DEFAULTQUEUE);
		}
		RecvMessageBean temprecv = new RecvMessageBean();
		RecvMessageBean exTempRecv = excuterthreadsmap.putIfAbsent(reqTxnSeq,
				temprecv);
		RecvMessageBean tempresult = null;
		// 如果已存在，就打个警告
		if (exTempRecv != null) {
			log.error("{} key duplicated", reqTxnSeq);
		}
		boolean acqureresult = false;
		try {
			template.send(destination, new MessageCreator() {
				public Message createMessage(Session session)
						throws JMSException {
					//Message msg = session.createObjectMessage(context); 
					Message msg = session.createTextMessage(context);
					//msg.getJMSCorrelationID();
					msg.setJMSReplyTo(respDest);
					if(!Strings.isNullOrEmpty(senderid)){
						msg.setStringProperty(AppCode.SENDERID, senderid);
					}else {
						msg.setStringProperty(AppCode.SENDERID, AppCode.INST_ID);
					}
					if(!Strings.isNullOrEmpty(destsenderid)){
						msg.setStringProperty(AppCode.DEST_SENDERID, destsenderid);
					}
					msg.setStringProperty(AppCode.REQTXNSEQ, reqTxnSeq);
					msg.setBooleanProperty(AppCode.ISASYNC, false);
					return msg;
				}
			});

			log.debug("message sended to:{},reqTxnSeq:{},context:{}",
					new Object[] { destination.toString(), reqTxnSeq, context });
			// 挂在该资源上，等待N秒内有回文
			acqureresult = temprecv.getRecvsemap().tryAcquire(
					template.getReceiveTimeout(), TimeUnit.MILLISECONDS);

		} catch (InterruptedException | RuntimeException e) {
			log.error("message sended reqTxnSeq:" + reqTxnSeq, e);
			throw new RuntimeException(e);
		} finally {
			// 从hashmap里拿到收到的消息
			tempresult = excuterthreadsmap.remove(reqTxnSeq);
		}

		if (acqureresult && tempresult != null) {
			Message msg = tempresult.getRecvmsg();
			if (msg != null) {
				log.debug(
						"succeed to receive message from:{},reqTxnSeq:{},returnMsg:{}",
						new Object[] { respDest.toString(), reqTxnSeq, msg });
				return msg;
			} else {
				log.error(
						"receive message time out! return null, reqTxnSeq:{}",
						reqTxnSeq);
				return null;
				// throw new RuntimeException("receive message time out");
			}
		} else {
			log.error(
					"receive message time out! return null, reqTxnSeq:{},map size is:{}",
					new Object[] { reqTxnSeq, excuterthreadsmap.size()});
			// throw new RuntimeException("receive message time out");
			return null;
		}
	}
	
	
	/**
	 * 发送消息<序列化对象>并获得返回消息
	 * @param reqTxnSeq
     * @param reqQueueName
     * @param rcvQueueName
     * @param context
     * @param expand
     * @return 
	 */
	@Override
	public Message sendMsg(final String reqTxnSeq, final String reqQueueName,
			final String rcvQueueName, final Serializable obj,
			final String... expand) {
		return sendMsg(reqTxnSeq, reqQueueName, rcvQueueName, obj, null,null,
				expand);
	}
	@Override
	public Message sendMsg(final String reqTxnSeq, final String reqQueueName,
			final String rcvQueueName, final Serializable obj, final String senderid, final String destsenderid, final String... expand) {
		destination = destinationMap.get(reqQueueName);
		respDest = respDestMap.get(rcvQueueName);
		if (destination == null || respDest == null) {
			destination = destinationMap.get(AppCode.DEFAULTQUEUE);
			respDest = respDestMap.get(AppCode.DEFAULTQUEUE);
		}

		RecvMessageBean temprecv = new RecvMessageBean();
		RecvMessageBean exTempRecv = excuterthreadsmap.putIfAbsent(reqTxnSeq,
				temprecv);
		RecvMessageBean tempresult = null;
		// 如果已存在，就打个警告
		if (exTempRecv != null) {
			log.error("{} key duplicated", reqTxnSeq);
		}
		boolean acqureresult = false;
		try {
			template.send(destination, new MessageCreator() {
				public Message createMessage(Session session)
						throws JMSException {
					Message msg = session.createObjectMessage(obj);
					msg.setJMSReplyTo(respDest);
					if(!Strings.isNullOrEmpty(senderid)){
						msg.setStringProperty(AppCode.SENDERID, senderid);
					}else {
						msg.setStringProperty(AppCode.SENDERID, AppCode.INST_ID);
					}
					if(!Strings.isNullOrEmpty(destsenderid)){
						msg.setStringProperty(AppCode.DEST_SENDERID, destsenderid);
					}
					msg.setStringProperty(AppCode.REQTXNSEQ, reqTxnSeq);
					msg.setBooleanProperty(AppCode.ISASYNC, false);
					return msg;
				}
			});

			log.debug("message sended to:{},reqTxnSeq:{},context:{}",
					new Object[] { destination.toString(), reqTxnSeq, obj });

			// 挂在该资源上，等待N秒内有回文
			acqureresult = temprecv.getRecvsemap().tryAcquire(
					template.getReceiveTimeout(), TimeUnit.MILLISECONDS);
		} catch (InterruptedException | RuntimeException e) {
			log.error("message sended reqTxnSeq:" + reqTxnSeq, e);
			throw new RuntimeException(e);		
		} finally {
			// 从hashmap里拿到收到的消息
			tempresult = excuterthreadsmap.remove(reqTxnSeq);
		}

		if (acqureresult && tempresult != null) {
			Message msg = tempresult.getRecvmsg();
			if (msg != null) {
				log.debug(
						"succeed to receive message from:{},reqTxnSeq:{},returnMsg:{}",
						new Object[] { respDest.toString(), reqTxnSeq, msg });
				return msg;
			} else {
				log.error(
						"receive message time out! return null, reqTxnSeq:{}",
						reqTxnSeq);
				return null;
			}
		} else {
			log.error(
					"receive message time out! return null, reqTxnSeq:{},map size is:{}",
					new Object[] { reqTxnSeq, excuterthreadsmap.size() });
			return null;
		}
	}
		

	/**
	 * 发送信息并获取返回文字信息 (non-Javadoc)
	 * 
	 * @see com.huateng.p3.common.tools.activemq.IClientSendMessage#sendTextRtnMessage(java.lang.String,
	 *      java.lang.String, java.lang.String, java.lang.String,
	 *      java.lang.String[])
	 */
	@Override
	public String sendTextRtnMessage(final String seq,
			final String reqQueueName, final String rcvQueueName,
			final String context) {

		TextMessage msg = (TextMessage) sendMsg(seq, reqQueueName,
				rcvQueueName, context);
		String resp = null;
		try {
			if (msg != null) {
				resp = msg.getText();
			}
		} catch (Exception e) {
			log.error("get text msg from return message result error :", e);
			throw new RuntimeException("", e);
		}
		return resp;
	}

	/**
	 * 发送信息并获取返回序列化信息 (non-Javadoc)
	 * 
	 * @see com.huateng.p3.common.tools.activemq.IClientSendMessage#sendTextRtnMessage(java.lang.String,
	 *      java.lang.String, java.lang.String, java.lang.String,
	 *      java.lang.String[])
	 */
	@Override
	public Serializable sendSerializableRtnMessage(final String seq,
			final String reqQueueName, final String rcvQueueName,
			final Serializable obj) {

		ObjectMessage msg = (ObjectMessage) sendMsg(seq, reqQueueName,
				rcvQueueName, obj);
		Serializable resp = null;
		try {
			if (msg != null) {
				resp = msg.getObject();
			}
		} catch (Exception e) {
			log.error("get serializable msg from return message result error :", e);
			throw new RuntimeException("", e);
		}
		return resp;
	}
	
	@Override
	public String sendTextRtnMessage(String seq, String reqQueueName,
			String rcvQueueName, String context, String senderid,
			String destsenderid) {
		String expand [] = {};
		TextMessage msg = (TextMessage) sendMsg(seq, reqQueueName,
				rcvQueueName, context,senderid, destsenderid,expand);
		String resp = null;
		try {
			if (msg != null) {
				resp = msg.getText();
			}
		} catch (Exception e) {
			log.error("get text msg from return message result error :", e);
			throw new RuntimeException("", e);
		}
		return resp;
	}
	@Override
	public Serializable sendSerializableRtnMessage(String seq,
			String reqQueueName, String rcvQueueName, Serializable obj,
			String senderid, String destsenderid) {
		String expand [] = {};
		ObjectMessage msg = (ObjectMessage) sendMsg(seq, reqQueueName,
				rcvQueueName, obj,senderid, destsenderid,expand);
		Serializable resp = null;
		try {
			if (msg != null) {
				resp = msg.getObject();
			}
		} catch (Exception e) {
			log.error("get serializable msg from return message result error :", e);
			throw new RuntimeException("", e);
		}
		return resp;
	}
	
	public void recvmessagefromserver(Message recvmessage) {
		if (recvmessage == null) {
			log.info("recvmessagefromserver error");
			return;
		}
		/* 得到通讯报文中包号 */

		String key;
		try {
			key = recvmessage.getStringProperty(AppCode.REQTXNSEQ);
			RecvMessageBean temprecv = excuterthreadsmap.get(key);

			if (temprecv != null) {
				temprecv.setRecvmsg(recvmessage);
				temprecv.getRecvsemap().release();
			} else {
				log.info(" map size is [{}] , recvmessagefromserver key [{}]: is null ", new Object[] { excuterthreadsmap.size(),key});
			}
		} catch (JMSException e) {
			log.error("recvmessagefromserver JMSException:", e);
		}
		// 挂在资源上的消息放入，同时唤醒工作线程
	}
	
	/**
	 * @return the template
	 */
	public JmsTemplate getTemplate() {
		return template;
	}

	/**
	 * @param template
	 *            the template to set
	 */
	public void setTemplate(JmsTemplate template) {
		this.template = template;
	}

	/**
	 * @return the destination
	 */
	public Destination getDestination() {
		return destination;
	}

	/**
	 * @param destination
	 *            the destination to set
	 */
	public void setDestination(Destination destination) {
		this.destination = destination;
	}

	/**
	 * @return the respDest
	 */
	public Destination getRespDest() {
		return respDest;
	}

	/**
	 * @param respDest
	 *            the respDest to set
	 */
	public void setRespDest(Destination respDest) {
		this.respDest = respDest;
	}

	

	/**
	 * @return the jmsTemplate4async
	 */
	public JmsTemplate getJmsTemplate4async() {
		return jmsTemplate4async;
	}

	/**
	 * @param jmsTemplate4async
	 *            the jmsTemplate4async to set
	 */
	public void setJmsTemplate4async(JmsTemplate jmsTemplate4async) {
		this.jmsTemplate4async = jmsTemplate4async;
	}

	public Map<String, Destination> getDestinationMap() {
		return destinationMap;
	}

	public void setDestinationMap(Map<String, Destination> destinationMap) {
		this.destinationMap = destinationMap;
	}

	public Map<String, Destination> getRespDestMap() {
		return respDestMap;
	}

	public void setRespDestMap(Map<String, Destination> respDestMap) {
		this.respDestMap = respDestMap;
	}
	

	

}
